{-|
= Scheduled Triggers

This module implements the functionality of invoking webhooks during specified
time events aka scheduled events. The scheduled events are the events generated
by the graphql-engine using the cron triggers or/and a scheduled event can
be created by the user at a specified time with the payload, webhook, headers
and the retry configuration. Scheduled events are modeled using rows in Postgres
with a @timestamp@ column.

This module implements scheduling and delivery of scheduled
events:

1. Scheduling a cron event involves creating new cron events. New
cron events are created based on the cron schedule and the number of
scheduled events that are already present in the scheduled events buffer.
The graphql-engine computes the new scheduled events and writes them to
the database.(Generator)

2. Delivering a scheduled event involves reading undelivered scheduled events
from the database and delivering them to the webhook server. (Processor)

The rationale behind separating the event scheduling and event delivery
mechanism into two different threads is that the scheduling and delivering of
the scheduled events are not directly dependent on each other. The generator
will almost always try to create scheduled events which are supposed to be
delivered in the future (timestamp > current_timestamp) and the processor
will fetch scheduled events of the past (timestamp < current_timestamp). So,
the set of the scheduled events generated by the generator and the processor
will never be the same. The point here is that they're not correlated to each
other. They can be split into different threads for a better performance.

== Implementation

The scheduled triggers eventing is being implemented in the metadata storage.
All functions that make interaction to storage system are abstracted in
the @'MonadMetadataStorage' class.

During the startup, two threads are started:

1. Generator: Fetches the list of scheduled triggers from cache and generates
   the scheduled events.

    - Additional events will be generated only if there are fewer than 100
      scheduled events.

    - The upcoming events timestamp will be generated using:

        - cron schedule of the scheduled trigger

        - max timestamp of the scheduled events that already exist or
          current_timestamp(when no scheduled events exist)

        - The timestamp of the scheduled events is stored with timezone because
          `SELECT NOW()` returns timestamp with timezone, so it's good to
          compare two things of the same type.

    This effectively corresponds to doing an INSERT with values containing
    specific timestamp.

2. Processor: Fetches the undelivered cron events and the scheduled events
   from the database and which have timestamp lesser than the
   current timestamp and then process them.
-}
module Hasura.Eventing.ScheduledTrigger
  ( runCronEventsGenerator
  , processScheduledTriggers
  , generateScheduleTimes

  , CronEventSeed(..)
  , initLockedEventsCtx
  , LockedEventsCtx(..)

  -- * Database interactions
  -- Following function names are similar to those present in
  -- 'MonadMetadataStorage' type class. To avoid duplication,
  -- 'Tx' is suffixed to identify as database transactions
  , getDeprivedCronTriggerStatsTx
  , getScheduledEventsForDeliveryTx
  , insertInvocationTx
  , setScheduledEventOpTx
  , unlockScheduledEventsTx
  , unlockAllLockedScheduledEventsTx
  , insertScheduledEventTx
  , dropFutureCronEventsTx
  , getOneOffScheduledEventsTx
  , getCronEventsTx
  , deleteScheduledEventTx
  , getInvocationsTx

  -- * Export utility functions which are useful to build
  -- SQLs for fetching data from metadata storage
  , mkScheduledEventStatusFilter
  , scheduledTimeOrderBy
  , mkPaginationSelectExp
  , withCount
  , invocationFieldExtractors
  , mkEventIdBoolExp
  ) where

import           Hasura.Prelude

import qualified Data.Aeson                             as J
import qualified Data.ByteString.Lazy                   as BL
import qualified Data.Environment                       as Env
import qualified Data.HashMap.Strict                    as Map
import qualified Data.List.NonEmpty                     as NE
import qualified Data.Set                               as Set
import qualified Data.TByteString                       as TBS
import qualified Data.Text                              as T
import qualified Database.PG.Query                      as Q
import qualified Network.HTTP.Client                    as HTTP
import qualified Text.Builder                           as TB

import           Control.Arrow.Extended                 (dup)
import           Control.Concurrent.Extended            (sleep)
import           Control.Concurrent.STM.TVar
import           Data.Has
import           Data.Int                               (Int64)
import           Data.List                              (unfoldr)
import           Data.Time.Clock
import           System.Cron

import qualified Hasura.Backends.Postgres.SQL.DML       as S
import qualified Hasura.Logging                         as L
import qualified Hasura.Tracing                         as Tracing

import           Hasura.Backends.Postgres.SQL.Types
import           Hasura.Eventing.Common
import           Hasura.Eventing.HTTP
import           Hasura.Eventing.ScheduledTrigger.Types
import           Hasura.HTTP
import           Hasura.Metadata.Class
import           Hasura.RQL.DDL.EventTrigger            (getHeaderInfosFromConf)
import           Hasura.RQL.DDL.Headers
import           Hasura.RQL.Types
import           Hasura.Server.Version                  (HasVersion)
import           Hasura.SQL.Types

-- | runCronEventsGenerator makes sure that all the cron triggers
--   have an adequate buffer of cron events.
runCronEventsGenerator
  :: ( MonadIO m
     , MonadMetadataStorage (MetadataStorageT m)
     )
  => L.Logger L.Hasura
  -> IO SchemaCache
  -> m void
runCronEventsGenerator logger getSC = do
  forever $ do
    sc <- liftIO getSC
    -- get cron triggers from cache
    let cronTriggersCache = scCronTriggers sc

    -- get cron trigger stats from db
    eitherRes <- runMetadataStorageT $ do
      deprivedCronTriggerStats <- getDeprivedCronTriggerStats
      -- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
      cronTriggersForHydrationWithStats <-
        catMaybes <$>
        mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
      insertCronEventsFor cronTriggersForHydrationWithStats

    onLeft eitherRes $ L.unLogger logger .
      ScheduledTriggerInternalErr . err500 Unexpected . tshow

    liftIO $ sleep (minutes 1)
    where
      withCronTrigger cronTriggerCache cronTriggerStat = do
        case Map.lookup (ctsName cronTriggerStat) cronTriggerCache of
          Nothing -> do
            L.unLogger logger $
              ScheduledTriggerInternalErr $
                err500 Unexpected "could not find scheduled trigger in the schema cache"
            pure Nothing
          Just cronTrigger -> pure $
            Just (cronTrigger, cronTriggerStat)

insertCronEventsFor
  :: (MonadMetadataStorage m)
  => [(CronTriggerInfo, CronTriggerStats)]
  -> m ()
insertCronEventsFor cronTriggersWithStats = do
  let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
        generateCronEventsFrom (ctsMaxScheduledTime stats) cti
  case scheduledEvents of
    []     -> pure ()
    events -> insertScheduledEvent $ SESCron events

generateCronEventsFrom :: UTCTime -> CronTriggerInfo-> [CronEventSeed]
generateCronEventsFrom startTime CronTriggerInfo{..} =
  map (CronEventSeed ctiName) $
      generateScheduleTimes startTime 100 ctiSchedule -- generate next 100 events

-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes from n cron = take n $ go from
  where
    go = unfoldr (fmap dup . nextMatch cron)

processCronEvents
  :: ( HasVersion
     , MonadIO m
     , Tracing.HasReporter m
     , MonadMetadataStorage (MetadataStorageT m)
     )
  => L.Logger L.Hasura
  -> LogEnvHeaders
  -> HTTP.Manager
  -> [CronEvent]
  -> IO SchemaCache
  -> TVar (Set.Set CronEventId)
  -> m ()
processCronEvents logger logEnv httpMgr cronEvents getSC lockedCronEvents = do
  cronTriggersInfo <- scCronTriggers <$> liftIO getSC
  -- save the locked cron events that have been fetched from the
  -- database, the events stored here will be unlocked in case a
  -- graceful shutdown is initiated in midst of processing these events
  saveLockedEvents (map _ceId cronEvents) lockedCronEvents
  -- The `createdAt` of a cron event is the `created_at` of the cron trigger
  for_ cronEvents $ \(CronEvent id' name st _ tries _ _)-> do
    case Map.lookup name cronTriggersInfo of
      Nothing ->  logInternalError $
        err500 Unexpected "could not find cron trigger in cache"
      Just CronTriggerInfo{..} -> do
        let webhookUrl = unResolvedWebhook ctiWebhookInfo
            payload = ScheduledEventWebhookPayload id' (Just name) st
                      (fromMaybe J.Null ctiPayload) ctiComment
                      Nothing
            retryCtx = RetryContext tries ctiRetryConf
        finally <- runMetadataStorageT $ flip runReaderT (logger, httpMgr) $
                   processScheduledEvent logEnv id' ctiHeaders retryCtx
                                         payload webhookUrl Cron
        removeEventFromLockedEvents id' lockedCronEvents
        onLeft finally logInternalError
  where
    logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err

processOneOffScheduledEvents
  :: ( HasVersion
     , MonadIO m
     , Tracing.HasReporter m
     , MonadMetadataStorage (MetadataStorageT m)
     )
  => Env.Environment
  -> L.Logger L.Hasura
  -> LogEnvHeaders
  -> HTTP.Manager
  -> [OneOffScheduledEvent]
  -> TVar (Set.Set OneOffScheduledEventId)
  -> m ()
processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents lockedOneOffScheduledEvents = do
  -- save the locked one-off events that have been fetched from the
  -- database, the events stored here will be unlocked in case a
  -- graceful shutdown is initiated in midst of processing these events
  saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
  for_ oneOffEvents $ \OneOffScheduledEvent{..} -> do
    (either logInternalError pure) =<< runMetadataStorageT do
      webhookInfo <- resolveWebhook env _ooseWebhookConf
      headerInfo <- getHeaderInfosFromConf env _ooseHeaderConf
      let webhookUrl = unResolvedWebhook webhookInfo
          payload = ScheduledEventWebhookPayload _ooseId Nothing
                    _ooseScheduledTime (fromMaybe J.Null _oosePayload)
                    _ooseComment (Just _ooseCreatedAt)
          retryCtx = RetryContext _ooseTries _ooseRetryConf

      flip runReaderT (logger, httpMgr) $
        processScheduledEvent logEnv _ooseId headerInfo retryCtx payload webhookUrl OneOff
      removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
  where
    logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err

processScheduledTriggers
  :: ( HasVersion
     , MonadIO m
     , Tracing.HasReporter m
     , MonadMetadataStorage (MetadataStorageT m)
     )
  => Env.Environment
  -> L.Logger L.Hasura
  -> LogEnvHeaders
  -> HTTP.Manager
  -> IO SchemaCache
  -> LockedEventsCtx
  -> m void
processScheduledTriggers env logger logEnv httpMgr getSC LockedEventsCtx {..} =
  forever $ do
    result <- runMetadataStorageT getScheduledEventsForDelivery
    case result of
      Left e -> logInternalError e
      Right (cronEvents, oneOffEvents) -> do
        processCronEvents logger logEnv httpMgr cronEvents getSC leCronEvents
        processOneOffScheduledEvents env logger logEnv httpMgr oneOffEvents leOneOffEvents
        liftIO $ sleep (minutes 1)
  where
    logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err

processScheduledEvent
  :: ( MonadReader r m
     , Has HTTP.Manager r
     , Has (L.Logger L.Hasura) r
     , HasVersion
     , MonadIO m
     , Tracing.HasReporter m
     , MonadMetadataStorage m
     )
  => LogEnvHeaders
  -> ScheduledEventId
  -> [EventHeaderInfo]
  -> RetryContext
  -> ScheduledEventWebhookPayload
  -> Text
  -> ScheduledEventType
  -> m ()
processScheduledEvent logEnv eventId eventHeaders retryCtx payload webhookUrl type'
                      = Tracing.runTraceT traceNote do
  currentTime <- liftIO getCurrentTime
  let retryConf = _rctxConf retryCtx
      scheduledTime = sewpScheduledTime payload
  if convertDuration (diffUTCTime currentTime scheduledTime)
    > unNonNegativeDiffTime (strcToleranceSeconds retryConf)
    then processDead eventId type'
    else do
      let timeoutSeconds = round $ unNonNegativeDiffTime
                             $ strcTimeoutSeconds retryConf
          httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
          headers = addDefaultHeaders $ map encodeHeader eventHeaders
          extraLogCtx = ExtraLogContext (Just currentTime) eventId
          webhookReqBodyJson = J.toJSON payload
          webhookReqBody = J.encode webhookReqBodyJson
          requestDetails = RequestDetails $ BL.length webhookReqBody
      eitherRes <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack webhookUrl)
      logHTTPForST eitherRes extraLogCtx requestDetails
      let decodedHeaders = map (decodeHeader logEnv eventHeaders) headers
      case eitherRes of
        Left e  -> processError eventId retryCtx decodedHeaders type' webhookReqBodyJson e
        Right r -> processSuccess eventId decodedHeaders type' webhookReqBodyJson r
  where
    traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload)

processError
  :: ( MonadIO m
     , MonadMetadataStorage m
     )
  => ScheduledEventId
  -> RetryContext
  -> [HeaderConf]
  -> ScheduledEventType
  -> J.Value
  -> HTTPErr a
  -> m ()
processError eventId retryCtx decodedHeaders type' reqJson err = do
  let invocation = case err of
        HClient excp -> do
          let errMsg = TBS.fromLBS $ J.encode $ show excp
          mkInvocation eventId 1000 decodedHeaders errMsg [] reqJson
        HParse _ detail -> do
          let errMsg = TBS.fromLBS $ J.encode detail
          mkInvocation eventId 1001 decodedHeaders errMsg [] reqJson
        HStatus errResp -> do
          let respPayload = hrsBody errResp
              respHeaders = hrsHeaders errResp
              respStatus = hrsStatus errResp
          mkInvocation eventId respStatus decodedHeaders respPayload respHeaders reqJson
        HOther detail -> do
          let errMsg = (TBS.fromLBS $ J.encode detail)
          mkInvocation eventId 500 decodedHeaders errMsg [] reqJson
  insertScheduledEventInvocation invocation type'
  retryOrMarkError eventId retryCtx err type'

retryOrMarkError
  :: (MonadIO m, MonadMetadataStorage m)
  => ScheduledEventId
  -> RetryContext
  -> HTTPErr a
  -> ScheduledEventType
  -> m ()
retryOrMarkError eventId retryCtx err type' = do
  let RetryContext tries retryConf = retryCtx
      mRetryHeader = getRetryAfterHeaderFromHTTPErr err
      mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
      triesExhausted = tries >= strcNumRetries retryConf
      noRetryHeader = isNothing mRetryHeaderSeconds
  if triesExhausted && noRetryHeader
    then
      setScheduledEventOp eventId (SEOpStatus SESError) type'
    else do
      currentTime <- liftIO getCurrentTime
      let delay = fromMaybe (round $ unNonNegativeDiffTime
                             $ strcRetryIntervalSeconds retryConf)
                    mRetryHeaderSeconds
          diff = fromIntegral delay
          retryTime = addUTCTime diff currentTime
      setScheduledEventOp eventId (SEOpRetry retryTime) type'

{- Note [Scheduled event lifecycle]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Scheduled events move between six different states over the course of their
lifetime, as represented by the following flowchart:
  ┌───────────┐      ┌────────┐      ┌───────────┐
  │ scheduled │─(a)─→│ locked │─(b)─→│ delivered │
  └───────────┘      └────────┘      └───────────┘
          ↑              │           ┌───────┐
          └────(c)───────┼─────(d)──→│ error │
                         │           └───────┘
                         │           ┌──────┐
                         └─────(e)──→│ dead │
                                     └──────┘

When a scheduled event is first created, it starts in the 'scheduled' state,
and it can transition to other states in the following ways:
  a. When graphql-engine fetches a scheduled event from the database to process
     it, it sets its state to 'locked'. This prevents multiple graphql-engine
     instances running on the same database from processing the same
     scheduled event concurrently.
  b. When a scheduled event is processed successfully, it is marked 'delivered'.
  c. If a scheduled event fails to be processed, but it hasn’t yet reached
     its maximum retry limit, its retry counter is incremented and
     it is returned to the 'scheduled' state.
  d. If a scheduled event fails to be processed and *has* reached its
     retry limit, its state is set to 'error'.
  e. If for whatever reason the difference between the current time and the
     scheduled time is greater than the tolerance of the scheduled event, it
     will not be processed and its state will be set to 'dead'.
-}

processSuccess
  :: (MonadMetadataStorage m)
  => ScheduledEventId
  -> [HeaderConf]
  -> ScheduledEventType
  -> J.Value
  -> HTTPResp a
  -> m ()
processSuccess eventId decodedHeaders type' reqBodyJson resp = do
  let respBody = hrsBody resp
      respHeaders = hrsHeaders resp
      respStatus = hrsStatus resp
      invocation = mkInvocation eventId respStatus decodedHeaders respBody respHeaders reqBodyJson
  insertScheduledEventInvocation invocation type'
  setScheduledEventOp eventId (SEOpStatus SESDelivered) type'

processDead
  :: (MonadMetadataStorage m)
  => ScheduledEventId -> ScheduledEventType -> m ()
processDead eventId type' =
  setScheduledEventOp eventId (SEOpStatus SESDead) type'

mkInvocation
  :: ScheduledEventId
  -> Int
  -> [HeaderConf]
  -> TBS.TByteString
  -> [HeaderConf]
  -> J.Value
  -> (Invocation 'ScheduledType)
mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson
  = let resp = if isClientError status
          then mkClientErr respBody
          else mkResp status respBody respHeaders
    in
      Invocation
      eventId
      status
      (mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
      resp

-- metadata database transactions

getDeprivedCronTriggerStatsTx :: Q.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx =
  map (\(n, count, maxTx) -> CronTriggerStats n count maxTx) <$>
    Q.listQE defaultTxErrorHandler
    [Q.sql|
     SELECT * FROM
      ( SELECT
         trigger_name,
          count(*) as upcoming_events_count,
          max(scheduled_time) as max_scheduled_time
         FROM hdb_catalog.hdb_cron_events
         WHERE tries = 0 and status = 'scheduled'
         GROUP BY trigger_name
      ) AS q
      WHERE q.upcoming_events_count < 100
     |] () True

getScheduledEventsForDeliveryTx :: Q.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx =
  (,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery
  where
    getCronEventsForDelivery :: Q.TxE QErr [CronEvent]
    getCronEventsForDelivery =
      map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql|
        WITH cte AS
          ( UPDATE hdb_catalog.hdb_cron_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_cron_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                )
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
        SELECT row_to_json(t.*) FROM cte AS t
      |] () True

    getOneOffEventsForDelivery :: Q.TxE QErr [OneOffScheduledEvent]
    getOneOffEventsForDelivery = do
      map (Q.getAltJ . runIdentity) <$> Q.listQE defaultTxErrorHandler [Q.sql|
         WITH cte AS (
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = 'locked'
            WHERE id IN ( SELECT t.id
                          FROM hdb_catalog.hdb_scheduled_events t
                          WHERE ( t.status = 'scheduled'
                                  and (
                                   (t.next_retry_at is NULL and t.scheduled_time <= now()) or
                                   (t.next_retry_at is not NULL and t.next_retry_at <= now())
                                  )
                                )
                          FOR UPDATE SKIP LOCKED
                          )
            RETURNING *
          )
         SELECT row_to_json(t.*) FROM cte AS t
      |] () False

insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> Q.TxE QErr ()
insertInvocationTx invo type' = do
  case type' of
    Cron -> do
      Q.unitQE defaultTxErrorHandler
        [Q.sql|
         INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |] ( iEventId invo
             , fromIntegral $ iStatus invo :: Int64
             , Q.AltJ $ J.toJSON $ iRequest invo
             , Q.AltJ $ J.toJSON $ iResponse invo) True
      Q.unitQE defaultTxErrorHandler [Q.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET tries = tries + 1
          WHERE id = $1
          |] (Identity $ iEventId invo) True
    OneOff -> do
      Q.unitQE defaultTxErrorHandler
        [Q.sql|
         INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
         (event_id, status, request, response)
         VALUES ($1, $2, $3, $4)
        |] ( iEventId invo
             , fromIntegral $ iStatus invo :: Int64
             , Q.AltJ $ J.toJSON $ iRequest invo
             , Q.AltJ $ J.toJSON $ iResponse invo) True
      Q.unitQE defaultTxErrorHandler [Q.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET tries = tries + 1
          WHERE id = $1
          |] (Identity $ iEventId invo) True

setScheduledEventOpTx
  :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> Q.TxE QErr ()
setScheduledEventOpTx eventId op type' = case op of
  SEOpRetry time    -> setRetry time
  SEOpStatus status -> setStatus status
  where
    setRetry time =
      case type' of
        Cron ->
          Q.unitQE defaultTxErrorHandler [Q.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |] (time, eventId) True
        OneOff ->
          Q.unitQE defaultTxErrorHandler [Q.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET next_retry_at = $1,
            STATUS = 'scheduled'
            WHERE id = $2
            |] (time, eventId) True
    setStatus status =
      case type' of
        Cron -> do
          Q.unitQE defaultTxErrorHandler
           [Q.sql|
            UPDATE hdb_catalog.hdb_cron_events
            SET status = $2
            WHERE id = $1
           |] (eventId, status) True
        OneOff -> do
          Q.unitQE defaultTxErrorHandler
           [Q.sql|
            UPDATE hdb_catalog.hdb_scheduled_events
            SET status = $2
            WHERE id = $1
           |] (eventId, status) True

unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> Q.TxE QErr Int
unlockScheduledEventsTx type' eventIds =
  case type' of
    Cron ->
      (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
      [Q.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_cron_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) and status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |] (Identity $ ScheduledEventIdArray eventIds) True

    OneOff ->
      (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
      [Q.sql|
        WITH "cte" AS
        (UPDATE hdb_catalog.hdb_scheduled_events
        SET status = 'scheduled'
        WHERE id = ANY($1::text[]) AND status = 'locked'
        RETURNING *)
        SELECT count(*) FROM "cte"
      |] (Identity $ ScheduledEventIdArray eventIds) True

unlockAllLockedScheduledEventsTx :: Q.TxE QErr ()
unlockAllLockedScheduledEventsTx = do
  Q.unitQE defaultTxErrorHandler [Q.sql|
          UPDATE hdb_catalog.hdb_cron_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |] () True
  Q.unitQE defaultTxErrorHandler [Q.sql|
          UPDATE hdb_catalog.hdb_scheduled_events
          SET status = 'scheduled'
          WHERE status = 'locked'
          |] () True

insertScheduledEventTx :: ScheduledEventSeed -> Q.TxE QErr ()
insertScheduledEventTx = \case
  SESOneOff CreateScheduledEvent{..} ->
    Q.unitQE defaultTxErrorHandler
       [Q.sql|
        INSERT INTO hdb_catalog.hdb_scheduled_events
        (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
        VALUES
        ($1, $2, $3, $4, $5, $6)
       |] ( Q.AltJ cseWebhook
          , cseScheduleAt
          , Q.AltJ csePayload
          , Q.AltJ cseRetryConf
          , Q.AltJ cseHeaders
          , cseComment)
          False

  SESCron cronSeeds -> insertCronEventsTx cronSeeds
    where
      insertCronEventsTx :: [CronEventSeed] -> Q.TxE QErr ()
      insertCronEventsTx events = do
        let insertCronEventsSql = TB.run $ toSQL
              S.SQLInsert
                { siTable    = cronEventsTable
                , siCols     = map unsafePGCol ["trigger_name", "scheduled_time"]
                , siValues   = S.ValuesExp $ map (toTupleExp . toArr) events
                , siConflict = Just $ S.DoNothing Nothing
                , siRet      = Nothing
                }
        Q.unitQE defaultTxErrorHandler (Q.fromText insertCronEventsSql) () False
        where
          toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
          toTupleExp = S.TupleExp . map S.SELit

dropFutureCronEventsTx :: TriggerName -> Q.TxE QErr ()
dropFutureCronEventsTx name =
  Q.unitQE defaultTxErrorHandler
   [Q.sql|
    DELETE FROM hdb_catalog.hdb_cron_events
    WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
   |] (Identity name) False


cronEventsTable :: QualifiedTable
cronEventsTable =
  QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"

mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter = \case
  [] -> S.BELit True
  v  -> S.BEIN (S.SEIdentifier $ Identifier "status")
        $ map (S.SELit . scheduledEventStatusToText) v

scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy =
  let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time"
  in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem scheduledTimeCol
     (Just S.OTAsc) Nothing

-- | Build a select expression which outputs total count and
-- list of json rows with pagination limit and offset applied
mkPaginationSelectExp
  :: S.Select
  -> ScheduledEventPagination
  -> S.Select
mkPaginationSelectExp allRowsSelect ScheduledEventPagination{..} =
  S.mkSelect
  { S.selCTEs = [(S.toAlias countCteAlias, allRowsSelect), (S.toAlias limitCteAlias, limitCteSelect)]
  , S.selExtr = [countExtractor, rowsExtractor]
  }
  where
    countCteAlias = Identifier "count_cte"
    limitCteAlias = Identifier "limit_cte"

    countExtractor =
      let selectExp = S.mkSelect
            { S.selExtr = [S.Extractor S.countStar Nothing]
            , S.selFrom = Just $ S.mkIdenFromExp countCteAlias
            }
      in S.Extractor (S.SESelect selectExp) Nothing

    limitCteSelect = S.mkSelect
      { S.selExtr = [S.selectStar]
      , S.selFrom = Just $ S.mkIdenFromExp countCteAlias
      , S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit
      , S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset
      }

    rowsExtractor =
      let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))"
          selectExp = S.mkSelect
            { S.selExtr = [S.Extractor jsonAgg Nothing]
            , S.selFrom = Just $ S.mkIdenFromExp limitCteAlias
            }
      in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing

withCount :: (Int, Q.AltJ a) -> WithTotalCount a
withCount (count, Q.AltJ a) = WithTotalCount count a

getOneOffScheduledEventsTx
  :: ScheduledEventPagination
  -> [ScheduledEventStatus]
  -> Q.TxE QErr (WithTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx pagination statuses = do
  let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events"
      statusFilter = mkScheduledEventStatusFilter statuses
      select = S.mkSelect
               { S.selExtr = [S.selectStar]
               , S.selFrom = Just $ S.mkSimpleFromExp table
               , S.selWhere = Just $ S.WhereFrag statusFilter
               , S.selOrderBy = Just scheduledTimeOrderBy
               }
      sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
  (withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False

getCronEventsTx
  :: TriggerName
  -> ScheduledEventPagination
  -> [ScheduledEventStatus]
  -> Q.TxE QErr (WithTotalCount [CronEvent])
getCronEventsTx triggerName pagination status = do
  let triggerNameFilter =
        S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName)
      statusFilter = mkScheduledEventStatusFilter status
      select = S.mkSelect
               { S.selExtr = [S.selectStar]
               , S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable
               , S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter
               , S.selOrderBy = Just scheduledTimeOrderBy
               }
      sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination
  (withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () False

deleteScheduledEventTx
  :: ScheduledEventId -> ScheduledEventType -> Q.TxE QErr ()
deleteScheduledEventTx eventId = \case
  OneOff ->
    Q.unitQE defaultTxErrorHandler [Q.sql|
      DELETE FROM hdb_catalog.hdb_scheduled_events
       WHERE id = $1
    |] (Identity eventId) False
  Cron  ->
    Q.unitQE defaultTxErrorHandler [Q.sql|
      DELETE FROM hdb_catalog.hdb_cron_events
       WHERE id = $1
    |] (Identity eventId) False

invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
invocationFieldExtractors table =
  [ S.Extractor (seIden "id") Nothing
  , S.Extractor (seIden "event_id") Nothing
  , S.Extractor (seIden "status") Nothing
  , S.Extractor (withJsonTypeAnn $ seIden "request") Nothing
  , S.Extractor (withJsonTypeAnn $ seIden "response") Nothing
  , S.Extractor (seIden "created_at") Nothing
  ]
  where
    withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json"
    seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier

mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp table eventId =
  S.BECompare S.SEQ (S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id")
  (S.SELit $ unEventId eventId)

getInvocationsTx
  :: GetInvocationsBy
  -> ScheduledEventPagination
  -> Q.TxE QErr (WithTotalCount [ScheduledEventInvocation])
getInvocationsTx invocationsBy pagination = do
  let sql = Q.fromBuilder $ toSQL $ mkPaginationSelectExp allRowsSelect pagination
  (withCount . Q.getRow) <$> Q.withQE defaultTxErrorHandler sql () True
  where
    createdAtOrderBy table =
      let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at"
      in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing

    oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs"
    cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs"

    allRowsSelect = case invocationsBy of
      GIBEventId eventId eventType ->
        let table = case eventType of
              OneOff -> oneOffInvocationsTable
              Cron   -> cronInvocationsTable
        in S.mkSelect
           { S.selExtr = invocationFieldExtractors table
           , S.selFrom = Just $ S.mkSimpleFromExp table
           , S.selOrderBy = Just $ createdAtOrderBy table
           , S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId
           }

      GIBEvent event -> case event of
        SEOneOff ->
          let table = oneOffInvocationsTable
          in S.mkSelect
             { S.selExtr = invocationFieldExtractors table
             , S.selFrom = Just $ S.mkSimpleFromExp table
             , S.selOrderBy = Just $ createdAtOrderBy table
             }
        SECron triggerName ->
          let invocationTable = cronInvocationsTable
              eventTable = cronEventsTable
              joinCondition = S.JoinOn $ S.BECompare S.SEQ
                (S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id")
                (S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id")
              joinTables =
                S.JoinExpr (S.FISimple invocationTable Nothing) S.Inner
                     (S.FISimple eventTable Nothing) joinCondition
              triggerBoolExp = S.BECompare S.SEQ
                (S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name"))
                (S.SELit $ triggerNameToTxt triggerName)

          in S.mkSelect
             { S.selExtr = invocationFieldExtractors invocationTable
             , S.selFrom = Just $ S.FromExp [S.FIJoin joinTables]
             , S.selWhere = Just $ S.WhereFrag triggerBoolExp
             , S.selOrderBy = Just $ createdAtOrderBy invocationTable
             }
